package Source

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties

// 读取kafka数据
object SourceKafka {
  def main(args: Array[String]): Unit = {
    // source 测试
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    // 1.从kafka中读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","master:9092")

    properties.setProperty("group.id","cg1")
    properties.setProperty("auto.offset.reset", "earliest")

    val ds3: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("javatopic", new SimpleStringSchema(), properties))
    ds3.print("sensor Hi,kafka")





    env.execute("source Test")

  }


}
